/*
 * @Author: thepoy
 * @Email: thepoy@163.com
 * @File Name: rocketmq.go
 * @Created: 2021-05-07 15:14:41
 * @Modified: 2021-05-08 15:24:11
 */

package rocketmq

import (
	"context"
	"core/canal"
	"core/es"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"strings"

	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/tidwall/gjson"
)

const (
	topic         string = "blog_articles"
	consumerGroup string = "blog"
)

var (
	server string
	err    error
	Host   string
)

func init() {
	Host = os.Getenv("HOST")
	if Host == "" {
		Host = "localhost"
	}
	server = Host + ":9876"
}

// ChangedData 变化的数据的结构体。
//
// 因为本示例使用的不是标准 json 库处理数据，所以此结构体没有用到。
// 这并不意味着这个结构体没有用，使用此结构体在后续的一些操作中会方便一些，只是性能稍差。
// 在不需要很高性能的场景下使用此结构体更适合。
type ChangedData struct {
	// 变化的文档集合
	Data []es.Document `json:"data"`
	// 发生变化的数据库
	Database string `json:"database"`
	// 数据库内执行时间
	ES uint64 `json:"es"`
	// 就是 id
	ID uint `json:"id"`
	// 是否为 DDL 语句，create database、create table、alter table
	IsDDL bool `json:"isDdl"`
	// 表结构的字段类型
	MysqlType map[string]string `json:"mysqlType"`
	// 主键名称
	PrimaryKeyNames []string `json:"pkNames"`
	// sql 语句
	SQL string `json:"sql"`
	// sql 语句类型
	SqlType map[string]uint `json:"sqlType"`
	// 表名称
	Table string `json:"table"`
	// 操作类型，(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
	Type string `json:"type"`
	// 数据库内解析时间
	Timestamp uint `json:"ts"`
	// 旧数据
	Old []map[string]string `json:"old"`
}

// MustUnmarshal 反序列化 ChangedData
func MustUnmarshal(body []byte) *ChangedData {
	var data ChangedData
	err := json.Unmarshal(body, &data)
	if err != nil {
		log.Println(err)
		return nil
	} else {
		return &data
	}
}

// Consume 使用 rocketmq 消费消息
func Consume(ctx context.Context) error {
	c, err := consumer.NewPushConsumer(
		consumer.WithGroupName(consumerGroup),
		consumer.WithNameServer([]string{server}),
		consumer.WithConsumerModel(consumer.Clustering),
	)
	if err != nil {
		return err
	}

	err = c.Subscribe(
		topic,
		consumer.MessageSelector{},
		func(c context.Context, me ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for _, msg := range me {
				body := gjson.Parse(string(msg.Body))
				data := body.Get("data").Array()
				old := body.Get("old").Array()
				canalTypeStr := body.Get("type").String()
				canalType := canal.CanalType(canalTypeStr)

				// TODO: changes 长度和 old 长度是否一样？
				for i := 0; i < len(data); i++ {
					changItem := data[i]

					id := changItem.Get("id").String()

					switch canalType {
					case canal.DELETE:
						log.Println("即将删除文档 ", id)

						err = deleteDocument(id)
						if err != nil {
							log.Fatalln("删除时出错：", err)
						}

						log.Println("已删除：", changItem.String())
					case canal.UPDATE:
						oldItem := old[i].Value().(map[string]interface{})

						newData := make(map[string]string)

						for k := range oldItem {
							newData[k] = changItem.Get(k).String()
						}

						log.Println("文档已存在，即将更新...")
						ok, err := updateDocument(id, newData)
						if err != nil {
							log.Fatalln(err)
						}

						if !ok {
							log.Println("更新失败")
							continue
						}

						log.Printf("已更新文档：id=%s, new-data=%v", id, newData)
					case canal.INSERT:
						err = storeDocument(changItem.Value().(map[string]interface{}))
						if err != nil {
							log.Fatalln("创建文档失败：", err)
						}
						log.Println("已创建新的文档：", changItem.Value().(map[string]interface{}))
					default:
						log.Fatal("未知操作", canalType)
					}

				}
			}
			return consumer.ConsumeSuccess, nil
		})
	if err != nil {
		return err
	}

	err = c.Start()
	if err != nil {
		return err
	}

	select {
	case <-ctx.Done():
		fmt.Println(strings.Repeat("*", 60))
		fmt.Println("shutdown consumer")
		fmt.Println(strings.Repeat("*", 60))
	}

	err = c.Shutdown()
	if err != nil {
		return err
	}

	return nil
}
